d05d5d797b507fe2176df9199cd884dc5203df05,src/main/java/com/github/ddth/kafka/internal/KafkaHelper.java,KafkaHelper,seekToEnd,#KafkaConsumer#String#,64
Before Change
List<PartitionInfo> partInfo = consumer.partitionsFor(topic);
for (PartitionInfo p : partInfo) {
TopicPartition tp = new TopicPartition(topic, p.partition());
consumer.assign(Arrays.asList(tp));
consumer.seekToEnd(tp);
consumer.position(tp);
consumer.commitSync();
}
} finally {
consumer.unsubscribe();
After Change
* @param consumer
* @param topic
*/
public static void seekToEnd(final KafkaConsumer<?, ?> consumer, final String topic) {
synchronized (consumer) {
// first, save the current subscription
Set<String> subscription = consumer.subscription();
try {
// second, unsubscribe and re-subscribe to all partitions.
consumer.unsubscribe();
List<PartitionInfo> partInfo = consumer.partitionsFor(topic);
Collection<TopicPartition> tpList = new ArrayList<>();
for (PartitionInfo p : partInfo) {
TopicPartition tp = new TopicPartition(topic, p.partition());
tpList.add(tp);
}
consumer.assign(tpList);
consumer.seekToEnd(tpList);
// we want to seek as soon as possible
for (TopicPartition tp : tpList) {
// since seekToEnd evaluates lazily, invoke position() so
// that seeking will be committed.
consumer.position(tp);
}
consumer.commitSync();
} finally {
// finally, restore subscription
consumer.unsubscribe();